package com.flink_demo.demo.cep.source;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import com.flink_demo.demo.cep.event.MonitorEvent;

public class MonitorEventSource extends RichSourceFunction<MonitorEvent> {

	private static final long serialVersionUID = 1L;

	private Random random = new Random();
	private int intval = 100;
	private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

	public MonitorEventSource(int intval) {
		this.intval = intval;
	}

	public MonitorEventSource() {
	}

	@Override
	public void run(SourceContext<MonitorEvent> ctx) throws Exception {
		while (true) {
			MonitorEvent m = new MonitorEvent();
			m.setId(random.nextInt(10));
			m.setFdate(sdf.format(new Date()));
			m.setValue(random.nextInt(10));
			ctx.collect(m);
			System.out.println("emit:" + m);
			Thread.sleep(this.intval);
		}
	}

	@Override
	public void cancel() {

	}

}
